package com.process;

import canal.bean.RowData;
import com.alibaba.fastjson.JSON;
import com.async.AsyncOrderDetailRedisRequest;
import com.base.MySqlBaseETL;
import com.bean.OrderGoodsWideEntity;
import com.utils.HbaseUtil;
import com.utils.KafkaUtil;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.io.IOException;
import java.util.concurrent.TimeUnit;

/**
 * @Description: 订单明细表
 * @Author: Sky
 * @Times : 2021/8/15 16:48
 * <p>
 * 订单明细的实时ETL操作
 * * 1）将订单明细数据的事实表和维度表的数据关联后写入到hbase中
 * * 2）将拉宽后的订单数据保存到kafka中，供Druid进行实时摄取
 * <p>
 * 为什么要存储两份：
 * * 1）保存到hbase的数据，是明细数据，可以持久化的存储的，将来需要分析明细数据的时候，可以查询
 * * 2）保存到kafka的数据，是有时效性的，会根据kafka设置的保留策略过期删除数据，摄取到Durid以后，就不在是明细数据了
 * *    已经对明细数据进行了预聚合操作
 * * Druid=kylin+hbase
 */
public class OrderDetailDataETL {

    public static void main(String[] args) throws Exception {

        OrderDetailDataETL etl = new OrderDetailDataETL();

        etl.process();

    }

    /**
     * 根据业务抽取出来process方法，因为所有的ETL都有操作方法
     */
    public void process() throws Exception {
        /**
         * 实现步骤：
         * 1：获取canal中的订单明细数据，过滤出来订单明细表的数据，将CanalRowData转换成OrderGoods样例类
         * 2：将订单明细表的数据进行实时的拉宽操作
         * 3：将拉宽后的订单明细表的数据转换成json字符串写入到kafka集群，供Druid进行实时的摄取
         * 4：将拉宽后的订单明细表的数据写入到hbase中，供后续订单明细详情数据的查询
         */
        //1：获取canal中的订单明细数据，过滤出来订单明细表的数据，将CanalRowData转换成OrderGoods样例类。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        MySqlBaseETL mysqlEtl = new MySqlBaseETL();
        //1.只过滤出来 foo_goods 表的日志，并进行转换
        DataStream<RowData> dwdOrders = mysqlEtl.KafkaConsummer("ods_shop", env);
//        dwdOrders.print(">>>>>");
        DataStream<RowData> orderGoodsCanalDataStream = dwdOrders.filter(new FilterFunction<RowData>() {
            @Override
            public boolean filter(RowData canalRowData) throws Exception {
                return canalRowData.getTableName().equals("foo_order_goods");
            }

        });
//        orderGoodsCanalDataStream.print(">>");
        //2：将订单明细表的数据进行实时的拉宽操作
        /**
         * orderGoodsCanalDataStream：要关联的数据源
         * AsyncOrderDetailRedisRequest：异步请求的对象
         * [1]：超时时间
         * [TimeUnit.SECONDS]：超时的时间单位
         * [100]：异步io的最大并发数
         */
        
        DataStream<OrderGoodsWideEntity> orderGoodsWideEntityDataStream = AsyncDataStream.unorderedWait(orderGoodsCanalDataStream, new AsyncOrderDetailRedisRequest(), 100, TimeUnit.SECONDS, 100);
//        orderGoodsWideEntityDataStream.print();
        //3：将拉宽后的订单明细表的数据转换成json字符串写入到kafka集群，供Druid进行实时的摄取
        DataStream<String> orderGoodsWideJsonDataStream = orderGoodsWideEntityDataStream.map(new MapFunction<OrderGoodsWideEntity, String>() {
            @Override
            public String map(OrderGoodsWideEntity orderGoodsWideEntity) throws Exception {
                return JSON.toJSONString(orderGoodsWideEntity);
            }

        });

        //4. 将转换后的json格式的订单明细数据写入到kafka集群 dwd_order_detail
        orderGoodsWideJsonDataStream.addSink(new RichSinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(KafkaUtil.kafkaCons);

                ProducerRecord<String, String> logs = new ProducerRecord<>("dwd_order_detail", null, value);

                kafkaProducer.send(logs);
            }
        });
        //5.将拉宽后的订单明细数据保存到hbase中.
        orderGoodsWideEntityDataStream.print("hb");
        orderGoodsWideEntityDataStream.addSink(new RichSinkFunction<OrderGoodsWideEntity>() {
            //定义Hbase的连接对象。
            private Connection connection;
            //定义Hbase表名
            private Table table;

            @Override
            public void open(Configuration parameters) throws Exception {
                connection = HbaseUtil.getHbasePool().getConnection();
                //初始化要写入的表名
                table = connection.getTable(TableName.valueOf("dwd_order_detail"));
            }

            //关闭资源
            @Override
            public void close() throws Exception {
                if (table != null) table.close();
                if (!connection.isClosed()) {
                    //将连接放回到连接池
                    HbaseUtil.getHbasePool().returnConnection(connection);
                }
            }

            //将数据一条条的写入到hbase.
            @Override
            public void invoke(OrderGoodsWideEntity orderGoodsWideEntity, Context context) {
                //构建Put对象
                //使用订单明细id作为rowkey。
                byte[] rowKey = Bytes.toBytes(orderGoodsWideEntity.getOrgId().toString());
                Put put = new Put(rowKey);
                //创建列族
                byte[] family = Bytes.toBytes("detail");

                byte[] ogIdCol = Bytes.toBytes("ogId");
                byte[] orderIdCol = Bytes.toBytes("orderId");
                byte[] goodsIdCol = Bytes.toBytes("goodsId");
                byte[] goodsNumCol = Bytes.toBytes("goodsNum");
                byte[] goodsPriceCol = Bytes.toBytes("goodsPrice");
                byte[] goodsNameCol = Bytes.toBytes("goodsName");
                byte[] shopIdCol = Bytes.toBytes("shopId");
                byte[] goodsThirdCatIdCol = Bytes.toBytes("goodsThirdCatId");
                byte[] goodsThirdCatNameCol = Bytes.toBytes("goodsThirdCatName");
                byte[] goodsSecondCatIdCol = Bytes.toBytes("goodsSecondCatId");
                byte[] goodsSecondCatNameCol = Bytes.toBytes("goodsSecondCatName");
                byte[] goodsFirstCatIdCol = Bytes.toBytes("goodsFirstCatId");
                byte[] goodsFirstCatNameCol = Bytes.toBytes("goodsFirstCatName");
                byte[] areaIdCol = Bytes.toBytes("areaId");
                byte[] shopNameCol = Bytes.toBytes("shopName");
                byte[] shopCompanyCol = Bytes.toBytes("shopCompany");
                byte[] cityIdCol = Bytes.toBytes("cityId");
                byte[] cityNameCol = Bytes.toBytes("cityName");
                byte[] regionIdCol = Bytes.toBytes("regionId");
                byte[] regionNameCol = Bytes.toBytes("regionName");

                put.addColumn(family, ogIdCol, Bytes.toBytes(orderGoodsWideEntity.getOrgId().toString()));
                put.addColumn(family, orderIdCol, Bytes.toBytes(orderGoodsWideEntity.getOrderId().toString()));
                put.addColumn(family, goodsIdCol, Bytes.toBytes(orderGoodsWideEntity.getGoodsId().toString()));
                put.addColumn(family, goodsNumCol, Bytes.toBytes(orderGoodsWideEntity.getGoodsNum().toString()));
                put.addColumn(family, goodsPriceCol, Bytes.toBytes(orderGoodsWideEntity.getGoodsPrice().toString()));
                put.addColumn(family, goodsNameCol, Bytes.toBytes(orderGoodsWideEntity.getGoodsName()));
                put.addColumn(family, shopIdCol, Bytes.toBytes(orderGoodsWideEntity.getShopId().toString()));
                put.addColumn(family, goodsThirdCatIdCol, Bytes.toBytes(orderGoodsWideEntity.getGoodsThirdCatId().toString()));
                put.addColumn(family, goodsThirdCatNameCol, Bytes.toBytes(orderGoodsWideEntity.getGoodsThirdCatName()));
                put.addColumn(family, goodsSecondCatIdCol, Bytes.toBytes(orderGoodsWideEntity.getGoodsSecondCatId()));
                put.addColumn(family, goodsSecondCatNameCol, Bytes.toBytes(orderGoodsWideEntity.getGoodsSecondCatName()));
                put.addColumn(family, goodsFirstCatIdCol, Bytes.toBytes(orderGoodsWideEntity.getGoodsFirstCatId()));
                put.addColumn(family, goodsFirstCatNameCol, Bytes.toBytes(orderGoodsWideEntity.getGoodsFirstCatName()));
                put.addColumn(family, areaIdCol, Bytes.toBytes(orderGoodsWideEntity.getAreaId()));
                put.addColumn(family, shopNameCol, Bytes.toBytes(orderGoodsWideEntity.getShopName()));
                put.addColumn(family, shopCompanyCol, Bytes.toBytes(orderGoodsWideEntity.getShopCompany()));
                put.addColumn(family, cityIdCol, Bytes.toBytes(String.valueOf(orderGoodsWideEntity.getCityId())));
                put.addColumn(family, cityNameCol, Bytes.toBytes(String.valueOf(orderGoodsWideEntity.getCityName())));
                put.addColumn(family, regionIdCol, Bytes.toBytes(String.valueOf(orderGoodsWideEntity.getRegionId())));
                put.addColumn(family, regionNameCol, Bytes.toBytes(String.valueOf(orderGoodsWideEntity.getRegionName())));
                //3：执行put操作
                try {
                    table.put(put);
                } catch (IOException e) {
                    e.printStackTrace();
                }


            }

        });

        env.execute();

    }


}
